package com.my.service.task;

import com.my.service.task.entity.UserSex;
import com.my.service.task.source.UserSexSource;
import com.my.service.task.util.HbaseUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;


// 用于demo演示
public class RealTimeNewUserCount {

    public static void main(String[] args) throws Exception {
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(1);
//        // source端从一个自定义循环体中持续的 读入新增用户
//        DataStreamSource<UserSex> rawStream = env.addSource(new UserSexSource());
//        SingleOutputStreamOperator<Tuple3<String,String, Integer>> mapStream = rawStream.flatMap(new FlatMapFunction<UserSex, Tuple3<String,String, Integer>>() {
//            @Override
//            public void flatMap(UserSex userSex, Collector<Tuple3<String,String, Integer>> collector) throws Exception {
//                collector.collect(Tuple3.of(userSex.getUserSex(),userSex.getTimeString(),1));
//            }
//        });
//        SingleOutputStreamOperator<Tuple3<String,String, Integer>> finalStream = mapStream.keyBy(0,1).timeWindow(Time.seconds(5)).sum(2);
//        finalStream.print();
//        finalStream.addSink(new SinkFunction<Tuple3<String, String, Integer>>() {
//            @Override
//            public void invoke(Tuple3<String, String, Integer> value, Context context) throws Exception {
//                String tablename = "userflaginfo";
//                String rowkey = value.f1;
//                String famliyname = "totalInfo";
//                String colum = value.f0;
//                HbaseUtils.putdata(tablename, rowkey, famliyname, colum, value.f2 + "");
//            }
//        });
//        env.execute("totalUserAnalysis");
//        }
        String tablename = "userflaginfo";
        String rowkey = "123";
        String famliyname = "total";
        String colum = "ss";
        String getdata = HbaseUtils.getdata(tablename, rowkey, famliyname, colum);
        System.out.println(getdata);
    }

    }
